Flink

您所在的位置:网站首页 flink 双流join的应用场景是什么 Flink

Flink

#Flink| 来源: 网络整理| 查看: 265

8 多流转换 8.1 分流 简单实现

对流三次filter算子操作实现分流

// 筛选 Mary 的浏览行为放入 MaryStream 流中 DataStream MaryStream = stream.filter(new FilterFunction() { @Override public boolean filter(Event value) throws Exception { return value.user.equals("Mary"); } }); // 筛选 Bob 的购买行为放入 BobStream 流中 DataStream BobStream = stream.filter(new FilterFunction() { @Override public boolean filter(Event value) throws Exception { return value.user.equals("Bob"); } }); // 筛选其他人的浏览行为放入 elseStream 流中 DataStream elseStream = stream.filter(new FilterFunction() { @Override public boolean filter(Event value) throws Exception { return !value.user.equals("Mary") && !value.user.equals("Bob") ; } }); MaryStream.print("Mary pv"); BobStream.print("Bob pv"); elseStream.print("else pv"); 使用测输出流 代码 public class SplitStreamTest { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator stream = env.addSource(new ClickSource()) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); //定义测输出流标签 //测输出流类型可以跟主流不同,因此换个类型 OutputTag maryTag = new OutputTag("Mary") {}; OutputTag bobTag = new OutputTag("Bob") {}; SingleOutputStreamOperator processStream = stream.process(new ProcessFunction() {//主流类型还是Event吧 @Override public void processElement(Event value, Context ctx, Collector out) throws Exception { if (value.user.equals("Mary")) { //把数据写道测输出流,第一个参数标签,第二个是输出形式 ctx.output(maryTag, Tuple3.of(value.user, value.url, value.timestamp)); } else if (value.user.equals("Bob")) { //把数据写道测输出流,第一个参数标签,第二个是输出形式 ctx.output(bobTag, Tuple3.of(value.user, value.url, value.timestamp)); } else {//其他放主流 out.collect(value); } } }); processStream.print("else"); processStream.getSideOutput(maryTag).print("Mary"); processStream.getSideOutput(bobTag).print("Bob"); env.execute(); } } 结果 else> Event{user='Alice', url='./home', timestamp=2022-11-25 21:56:01.958} else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:02.971} Mary> (Mary,./home,1669384564001) else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:05.004} Bob> (Bob,./prod?id=100,1669384566019) Bob> (Bob,./cart,1669384567024) else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:08.027} 8.2 合流 8.2.1 概述 含义

在这里插入图片描述

要求:数据类型要相同

特点:可以合并多条流

使用 stream1.union(stream2,stream3,...) 8.2.2 联合(Union) 代码 public class UnionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator stream1 = env.socketTextStream("hadoop2",7777) .map(data->{ String[] field = data.split(","); return new Event(field[0].trim(),field[1].trim(),Long.valueOf(field[2].trim())); }) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); stream1.print("stream1"); SingleOutputStreamOperator stream2 = env.socketTextStream("hadoop2",8888) .map(data->{ String[] field = data.split(","); return new Event(field[0].trim(),field[1].trim(),Long.valueOf(field[2].trim())); }) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }) ); stream2.print("stream2"); //合并两条流 stream1.union(stream2) .process(new ProcessFunction() { @Override public void processElement(Event value, Context ctx, Collector out) throws Exception { out.collect("水位线:"+ctx.timerService().currentWatermark()); } }) .print(); env.execute(); } } 结果及分析

在这里插入图片描述

图我习惯往上一层,方便对应着看

端口7777时间戳为2000,等下次事件发生由于延迟两秒,再-1,水位线为-1

但是由于端口8888的一直没数据,因此无论端口7777时间戳到哪里,水位线都是由两条流中较低的而决定,即以端口8888决定,因此水位线一直为一个很大的负数

此时如果在端口8888的窗口中输入数据,那么水位线会根据此流而变化,由于此流延迟五秒,会到6000的时候才会推动上一个时间戳5000的水位线到达-1,并与7777端口直至持平

在这里插入图片描述

当8888端口时间戳进行到7000的时候,水位线没有变成199(6000-50000-1),由于7777端口的时间戳才到-1,因此由低的流决定,显示水位线为-1

在这里插入图片描述

在7777端口输入Mary,./home,6000的推近上一个时间戳的水位线到3999(6000-2000-1),在8888端口输入Mary,./home,7000的时候也推近了7000这个时间戳的水位线,因为只需要2000毫秒就能触发,现在2000毫秒过了,就能触发,因此最后水位线显示1999

8.2.3 连接(connect) 概述

在这里插入图片描述

两个不同类型的DataStream连接(通过.connect)形成ConnectedStreams,进行算子转换后才得到DataStream

但是之前的map(),flatMap(),process()传入的都是对应的函数类处理单流数据,现在需要处理多流,会在原来的MapFunction前面加上Co,即CoMapFunction,其他的也一样CoProcessFunction,并且CoMapFuntion中方法有map1()方法和map2方法

分析

在这里插入图片描述

stream2(DataStream)调用connect后得到的是ConnectedStream

在这里插入图片描述

ConnectStream不继承DataStream了,ConnectStream的泛型分别是两个流的类型,其中有process()方法等,传入的CoProcessFunction 在这里插入图片描述

CoProcessFunction继承Function,并且有两个map的方法,分别是map1和map2传入的三个参数分别是,第一个流,第二个流,以及输出(即合流后的类型),

代码 demo public class ConnectTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource stream1 = env.fromElements(1, 2, 3); DataStreamSource stream2 = env.fromElements(4L,5L,6L,7L); stream2.connect(stream1) .map(new CoMapFunction() { @Override public String map1(Long value) throws Exception { return "Long:"+value.toString(); } @Override public String map2(Integer value) throws Exception { return "Integer:"+value.toString(); } }) .print(); env.execute(); } }

结果

Integer:1 Integer:2 Integer:3 Long:4 Long:5 Long:6 Long:7 实时对账案例

两条流,一条是app用户提交支付的请求,另一条流是第三方支付平台给我们反馈的订单支付的请求

public class BillCheckExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //来自app的支付日志 SingleOutputStreamOperator appStream = env.fromElements( Tuple3.of("order-1", "app", 1000L), Tuple3.of("order-2", "app", 2000L), Tuple3.of("order-3", "app", 3500L) ).assignTimestampsAndWatermarks(WatermarkStrategy. forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Tuple3 element, long recordTimestamp) { return element.f2; } }) ); //来自第三方平台的支付日志 SingleOutputStreamOperator thirdpartStream = env.fromElements( Tuple4.of("order-1", "third-party", "success", 3000L), Tuple4.of("order-3", "third-party", "success", 4000L) ).assignTimestampsAndWatermarks(WatermarkStrategy. forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Tuple4 element, long recordTimestamp) { return element.f3; } }) ); //检测同一支付单在两条流中是否匹配,等待一段时间后,不匹配就报警 // //这种也可以 // appStream.keyBy(data->data.f0) // .connect(thirdpartStream.keyBy(data -> data.f0)); // appStream.connect(thirdpartStream) .keyBy(data->data.f0,data-> data.f0) .process(new OrderMatchResult()) .print(); env.execute(); } //自定义实现CoFunction public static class OrderMatchResult extends CoProcessFunction{ //定义状态变量,用来保存已经到达的事件 private ValueState appEventState; private ValueState thirdPartyEventState; //运行上下文环境中获取状态 @Override public void open(Configuration parameters) throws Exception { appEventState = getRuntimeContext().getState( new ValueStateDescriptor("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG)) ); thirdPartyEventState = getRuntimeContext().getState( new ValueStateDescriptor("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG)) ); } @Override public void processElement1(Tuple3 value, CoProcessFunction.Context ctx, Collector out) throws Exception { //来的是app event,看另一条流中事件是否来过 if(thirdPartyEventState.value()!=null){ out.collect("对账成功:"+value+" "+thirdPartyEventState.value()); //清空状态 thirdPartyEventState.clear(); }else{ //如果每来就等待,并且更新状态 appEventState.update(value); //注册一个5秒后的定时器,开始等待另一条的事件 ctx.timerService().registerEventTimeTimer(value.f2+5000L); } } @Override public void processElement2(Tuple4 value, CoProcessFunction.Context ctx, Collector out) throws Exception { //来的是app event,看另一条流中事件是否来过 if(appEventState.value()!=null){ out.collect("对账成功:"+appEventState.value()+" "+value); //清空状态 appEventState.clear(); }else{ //如果没来就等待,并且更新状态 thirdPartyEventState.update(value); //注册一个5秒后的定时器,开始等待另一条的事件 ctx.timerService().registerEventTimeTimer(value.f3); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { //定时器触发,判断状态,如果某个状态不为空,说明另一条中事件没来 //并且不会存在两个都不为空,因为其中一个不为空后会被清除 //没有没清空表示失败 if(appEventState.value()!=null){ out.collect("对账失败:"+appEventState.value()+" "+"第三方支付平台信息未到"); } if(thirdPartyEventState.value()!=null){ out.collect("对账失败:"+thirdPartyEventState.value()+" "+"APP信息信息未到"); } //清空所有数据 appEventState.clear(); thirdPartyEventState.clear(); } } } 结果 对账成功:(order-1,app,1000) (order-1,third-party,success,3000) 对账成功:(order-3,app,3500) (order-3,third-party,success,4000) 对账失败:(order-2,app,2000) 第三方支付平台信息未到 8.2.4 广播连接流(broadcast) 概述

在这里插入图片描述

DataStream调用connect()方法后可以传入BroadcastStream广播流

在这里插入图片描述

传入广播流后返回的是BroadcastConnectedStream广播连接流,用于动态实时变化定义配置的场景

在这里插入图片描述

BroadcastStream广播流通过保存成广播状态广播给下游

DataStream中有broadcast()方法,需要传入MapStateDescriotor映射状态描述器,保存成映射状态然后广播至下游,最后返回了BroadcastStream

运用

在这里插入图片描述

new MapStateDescriotor形成MapStateDescriotor,传入broadcast(),返回BroadcastStream 在这里插入图片描述

再将得到的BroadcastStream对象放入DataStream调用connect()方法中,最终得到BroadcastConnectedStream广播连接流

在这里插入图片描述

在这里插入图片描述

而后BroadcastConnectedStream也可以调用process()方法,跟之前一样可以传入KeyedBroadcastProcessFunction,里面也是两个,一个是processElement(数据流使用的)以及processBroadcastElement(广播流用的)最后返回SingleOutputStreamOperator

8.3 双流join 8.3.1 概述 两条流类型不同特殊的connect 8.3.2 窗口联结(Window Join) 分析 在这里插入图片描述

在这里插入图片描述

DataStream直接调用join方法,并得到JoinedStream

在这里插入图片描述

在这里插入图片描述

得到JoinedStream后,就可以调用where方法,where()中传入第一条流的KeySelelctor,返回Where类型

在这里插入图片描述

在这里插入图片描述

Where是JoinedStream的内部类,内部类中equalTo()传入第二条流KeySelelctor,并且返回EqualTo内部类

在这里插入图片描述

在这里插入图片描述

EqualTo内部类的方法window(),传入WindowAssigner跟之前的window函数一样了,可以传入TumblingEventTimeWindows滚动窗口以及其他的滑动以及会话窗口,最终返回的是WithWindow静态类

在这里插入图片描述

在这里插入图片描述

WithWindow中的方法就是之前窗口API能做的事情,例如apply(),然后apply()中键可以再传入FlatJoinFunction以及JoinFunction 函数 在这里插入图片描述

在这里插入图片描述

JoinFunction 参数类比CoMapFunction,方法为join联合两条流并输出OUT,FlatJoinFunction也差不多

总结 在这里插入图片描述 使用 stream1.join(stream2) .where() .equalTo() .window() .apply()

跟sql的join where xx=xx很像,上面的结果默认是inner join

橙流join绿流

在这里插入图片描述

代码 demo public class WindowJoinTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator stream1 = env.fromElements( Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; } })); SingleOutputStreamOperator stream2 = env.fromElements( Tuple2.of("a", 3000), Tuple2.of("b", 4000), Tuple2.of("a", 4500), Tuple2.of("b", 5500) ).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; } })); stream1.join(stream2) .where(data->data.f0) .equalTo(data->data.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new JoinFunction() { @Override public String join(Tuple2 first, Tuple2 second) throws Exception { return first+" -> "+second; } }).print(); env.execute(); } }

结果

(a,1000) -> (a,3000) (a,1000) -> (a,4500) (a,2000) -> (a,3000) (a,2000) -> (a,4500) (b,1000) -> (b,4000) (b,2000) -> (b,4000) 8.3.3 间隔联结(Interval Join) 概述

在这里插入图片描述

区间有lowerBound下届,upperBound上届

a.timestamp+lowerBound public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator orderStream = env.fromElements( Tuple2.of("Mary", 1000L), Tuple2.of("Alice", 1000L), Tuple2.of("Bob", 2000L), Tuple2.of("Alice", 2000L), Tuple2.of("Cary", 2000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; } })); SingleOutputStreamOperator clickStream = env.fromElements( new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3300L), new Event("Alice", "./prod?id=200", 3000L), new Event("Bob", "./home", 3500L), new Event("Bob", "./prod?id=2", 3800L), new Event("Bob", "./prod?id=3", 4200L) ).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } })); //将两条流做一个连接 orderStream.keyBy(data->data.f0) .intervalJoin(clickStream.keyBy(data->data.user)) .between(Time.seconds(-5),Time.seconds(10)) .process(new ProcessJoinFunction(){ @Override public void processElement(Tuple2 left, Event right, ProcessJoinFunction.Context ctx, Collector out) throws Exception { out.collect(right+" => "+left);//浏览记录导致订单 } }) .print(); env.execute(); } } 结果 Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0} => (Alice,1000) Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0} => (Bob,2000) Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3} => (Bob,2000) Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0} => (Alice,2000) Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0} => (Alice,2000) Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0} => (Alice,1000) Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5} => (Bob,2000) Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8} => (Bob,2000) Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2} => (Bob,2000) 8.3.4 窗口同组连接(Window CoGroup) 使用 stream1.coGroup(Stream2) .where() .equalTo() .window(TumblingEventTimeWindows.of(Time.hours(1))) .apply()

对比窗口联结

stream1.join(stream2) .where() .equalTo() .window() .apply(

把join变成了coGroup,以及JoinFunction变成CoGroupFunction

分析

在这里插入图片描述

DataStream有coGroup方法,需要传入DataStream,返回CoGroupedStreams

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

CoGroupedStreams方法中可以调用where方法得到Where类型,然后调用equalTo()方法得到EqualTo类型,然后调用window()方法指定窗口得到WithWindow类型,可以在调用apply()

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

apply()传入的是CoGroupFunction接口,也是只有一个单一抽象方法coGroup(),coGroup()方法中传入的参数,是Iterable集合类型,表示的是窗口内的一组元素(非一个)

重点:使用coGroup()方法可以实现除了内连接以外的连接,也可以实现左外连接和右外连接

代码 代码 public class CoGroupTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator stream1 = env.fromElements( Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; } })); SingleOutputStreamOperator stream2 = env.fromElements( Tuple2.of("a", 3000), Tuple2.of("b", 4000), Tuple2.of("a", 4500), Tuple2.of("b", 5500) ).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; } })); stream1.coGroup(stream2) .where(data->data.f0) .equalTo(data->data.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new CoGroupFunction() { @Override public void coGroup(Iterable first, Iterable second, Collector out) throws Exception { out.collect(first+"=>"+second); } }).print(); env.execute(); } } 结果 [(a,1000), (a,2000)]=>[(a,3000), (a,4500)] [(b,1000), (b,2000)]=>[(b,4000)] []=>[(b,5500)]


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3